package org.jadelyre.lyre;

import org.jadelyre.core.*;
import org.jadelyre.core.codec.Codec;
import org.jadelyre.core.codec.sun.SunCodec;
import org.jadelyre.core.exception.JadeLyreException;
import org.jadelyre.core.exception.LyreException;
import org.jadelyre.core.worker.Schedule;
import org.jadelyre.core.worker.WorkerFactory;
import org.jadelyre.utils.Assert;
import org.jadelyre.utils.StrUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class LyreClient extends AbstractJadeLyre {

    private Channel channel;
    private AsynchronousSocketChannel asynchronousSocketChannel;
    private boolean firstConnect = true;
    private Schedule heartbeatSchedule;

    public LyreClient(ClientConfig clientConfig, Handler handler) throws JadeLyreException {
        this(clientConfig, handler, new SunCodec());
    }

    public LyreClient(ClientConfig clientConfig, Handler handler, Codec codec) throws JadeLyreException {
        super(handler, codec,clientConfig);
        Assert.notNull(clientConfig, "ClientConfig is required,must not be null");
        Assert.notNull(handler, "Handler is required,must not be null");
    }

    public Channel connect() {
        ClientConfig clientConfig = (ClientConfig)jadeLyreConfig;
        try {
            asynchronousSocketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup);
            for (Map.Entry<SocketOption<?>, Object> entry: clientConfig.getSocketOptions().entrySet()) {
                asynchronousSocketChannel.setOption((SocketOption<Object>)(entry.getKey()),entry.getValue());
            }
            if (clientConfig.getBindPort() != null) {
                InetSocketAddress inetSocketAddress = StrUtils.isBlank(clientConfig.getBindHost()) ? new InetSocketAddress(clientConfig.getBindPort())
                        : new InetSocketAddress(clientConfig.getBindHost(), clientConfig.getBindPort());
                asynchronousSocketChannel.bind(inetSocketAddress);
            }
        } catch (IOException e) {
            throw new LyreException(e);
        }
        NodeAddress nodeAddress = new NodeAddress(clientConfig.getServerHost(), clientConfig.getServerPort());
        LyreChannel lyreChannel = new LyreChannel(this, asynchronousSocketChannel, nodeAddress);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        lyreChannel.getConnectionCompletionHandler().setCountDownLatch(countDownLatch);
        asynchronousSocketChannel.connect(new InetSocketAddress(clientConfig.getServerHost(), clientConfig.getServerPort()),
                lyreChannel, lyreChannel.getConnectionCompletionHandler());
        boolean await;
        try {
            await = countDownLatch.await(clientConfig.getConnectTimeout(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new LyreException(e);
        }
        if (!await && firstConnect) {
            throw new LyreException("could not connect remote server，check config or remote server started！");
        }
        if (started) {
            addShutdownHook();
            heartbeat(lyreChannel);
            channel = lyreChannel;
            return channel;
        }
        return null;
    }

    private void heartbeat(LyreChannel lyreChannel) {
        ClientConfig clientConfig = (ClientConfig)jadeLyreConfig;
        HeartbeatConfig heartbeatConfig = clientConfig.getHeartbeatConfig();
        if (heartbeatConfig == null) {
            return;
        }
        if (heartbeatConfig.isEnable()) {
            HeartbeatHandler heartbeatHandler = handler.getHeartbeatHandler();
            if (heartbeatHandler == null) {
                throw new LyreException("HeartbeatHandler not been config");
            }
            //在断线重连时需要将原来的心跳检测任务取消掉
            if (heartbeatSchedule != null){
                ScheduledFuture<?> future = heartbeatSchedule.getFuture();
                if (!future.isCancelled()){
                    future.cancel(true);
                }
            }
            int timeout = clientConfig.getHeartbeatConfig().getTimeout();
            HeartbeatTask heartbeatTask = new HeartbeatTask(heartbeatHandler, lyreChannel, timeout);
            Schedule schedule = new Schedule(Schedule.Type.schedule_fix_delay,
                    heartbeatTask, TimeUnit.SECONDS, timeout, timeout, 0);
            scheduledWorker.work(schedule);
            heartbeatSchedule = schedule;
        }
    }

    @Override
    public void stop() throws LyreException {
        started = false;
        try {
            if (asynchronousSocketChannel.isOpen()) {
                asynchronousSocketChannel.shutdownInput().shutdownOutput().close();
            }
        } catch (IOException e) {
            throw new LyreException(e);
        }
        asynchronousChannelGroup.shutdown();
    }

    public void setFirstConnect(boolean firstConnect) {
        this.firstConnect = firstConnect;
    }

    public boolean isFirstConnect() {
        return firstConnect;
    }

    public Channel getChannel() {
        return channel;
    }

    public Schedule getHeartbeatSchedule() {
        return heartbeatSchedule;
    }
}
